gen-ai instrumentation(feat): anthropic messages stream method instrumentation#4499
Conversation
There was a problem hiding this comment.
Pull request overview
Adds tracing instrumentation for the Anthropic Python SDK Messages.stream() helper in the opentelemetry-instrumentation-anthropic GenAI instrumentation, including VCR-backed tests and wrapper updates to support the new lifecycle.
Changes:
- Add
Messages.stream()patching and a newMessagesStreamManagerWrapperpath to produce spans for the stream helper. - Refactor stream wrappers to use invocation
stop()/fail()directly (instead of handler stop/fail calls). - Add VCR cassettes + new sync tests for
Messages.stream()success/content/error/user-exception scenarios, and adjust wrapper unit tests.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| util/opentelemetry-util-genai/src/opentelemetry/util/genai/_inference_invocation.py | Import refactor for get_content_attributes usage. |
| util/opentelemetry-util-genai/src/opentelemetry/util/genai/_agent_invocation.py | Import refactor for get_content_attributes usage. |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py | Adds messages_stream() patch + shared invocation creation helper. |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/wrappers.py | Updates wrappers to stop/fail via invocation; adds sync manager __enter__ failure handling. |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/init.py | Instruments/uninstruments Messages.stream in addition to Messages.create. |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_sync_messages.py | Adds new sync Messages.stream() tests (VCR + error paths). |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/test_async_wrappers.py | Updates wrapper unit tests to match invocation stop/fail API; adds sync-manager enter failure test. |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/tests/cassettes/*.yaml | New VCR recordings for Messages.stream() test cases. |
| instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md | Notes addition of Messages.stream() instrumentation. |
| invocation, capture_content = _create_invocation( | ||
| handler, instance, args, kwargs | ||
| ) | ||
|
|
||
| try: | ||
| return MessagesStreamManagerWrapper( | ||
| wrapped(*args, **kwargs), invocation, capture_content | ||
| ) |
There was a problem hiding this comment.
messages_stream() starts an InferenceInvocation (which immediately starts a span and attaches it to the current context) before the underlying MessageStreamManager is entered. If a caller stores the manager and enters it later—or never enters it—this leaves the span/context attached for longer than the actual request (or indefinitely), which can corrupt parent/child relationships and leak context.
Consider deferring handler.start_inference(...) until the manager wrapper’s __enter__ (e.g., pass the handler + extracted params or a factory into MessagesStreamManagerWrapper and create/start the invocation inside __enter__, failing/stopping it there as needed).
| async def __aenter__( | ||
| self, | ||
| ) -> AsyncMessagesStreamWrapper[ResponseFormatT]: | ||
| msg_stream = await self._manager.__aenter__() | ||
| self._stream_wrapper = AsyncMessagesStreamWrapper( | ||
| msg_stream, | ||
| self._handler, | ||
| self._invocation, | ||
| self._capture_content, | ||
| ) |
There was a problem hiding this comment.
AsyncMessagesStreamManagerWrapper.__aenter__() doesn’t handle exceptions from the wrapped manager’s __aenter__. If __aenter__ raises, the passed-in invocation span remains open/attached because invocation.fail(exc) is never called (unlike the sync manager wrapper).
Wrap the awaited self._manager.__aenter__() in a try/except, call self._invocation.fail(exc), then re-raise the original exception.
lmolkova
left a comment
There was a problem hiding this comment.
Look good, just some implementation-level concerns
| ) | ||
| with self._safe_instrumentation("stop_llm"): | ||
| self.handler.stop_llm(self.invocation) | ||
| with self._safe_instrumentation("invocation stop"): |
There was a problem hiding this comment.
_safe_instrumentation("invocation stop") prevents telemetry cleanup failures from surfacing to users.
_stop() can run during close(), StopIteration, context-manager exit, or response proxy close. If invocation.stop() raises due to instrumentation/exporter/completion-hook behavior, the wrapper should not replace or introduce an application exception. The guard keeps the instrumentation non-invasive: it logs at debug and preserves normal stream behavior. Lmk if you think this is an overkill I can remove it.
There was a problem hiding this comment.
we never do this in other places. instrumentation / exporter / completion hook should never raise. if something fails, 1. it must not be otel code 2. we mist bubble it up
|
Will async be implemented as well or leave that with a separate pr? |
Yes @lzchen it will be in separate pr. We already have one for that. |
There was a problem hiding this comment.
LGTM, just one concern about _safe_instrumentation("invocation stop") - not sure I understand the motivation behind it still.
OTel error handling approach is that you should never expect exception from otel component, so if something throws it's the code not related to otel, and we should be transparent and let it bubble up.
…src/opentelemetry/instrumentation/anthropic/wrappers.py Co-authored-by: Liudmila Molkova <neskazu@gmail.com>
https://github.com/eternalcuriouslearner/opentelemetry-python-contrib into feat/anthropic-messages-stream-method-instrumentation
Description
This PR adds instrumentation to the Anthropic Messages SDK's stream method.
Fixes # (issue)
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration
Does This PR Require a Core Repo Change?
Checklist:
See contributing.md for styleguide, changelog guidelines, and more.